{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# default_exp models.mWDN"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# multilevel Wavelet Decomposition Network (mWDN)\n",
    "\n",
    "> This is an unofficial PyTorch implementation by Ignacio Oguiza - oguiza@gmail.com based on:\n",
    "\n",
    "* Wang, J., Wang, Z., Li, J., & Wu, J. (2018, July). Multilevel wavelet decomposition network for interpretable time series analysis. In Proceedings of the 24th ACM SIGKDD International Conference on Knowledge Discovery & Data Mining (pp. 2437-2446).\n",
    "* No official implementation found"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "from tsai.imports import *\n",
    "from tsai.models.layers import *\n",
    "from tsai.models.InceptionTimePlus import *\n",
    "from tsai.models.utils import build_model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "import pywt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "# This is an unofficial PyTorch implementation by Ignacio Oguiza - oguiza@gmail.com based on:\n",
    "\n",
    "# Wang, J., Wang, Z., Li, J., & Wu, J. (2018, July). Multilevel wavelet decomposition network for interpretable time series analysis. In Proceedings of the 24th ACM SIGKDD International Conference on Knowledge Discovery & Data Mining (pp. 2437-2446).\n",
    "# No official implementation found\n",
    "\n",
    "\n",
    "class WaveBlock(Module):\n",
    "    def __init__(self, c_in, c_out, seq_len, wavelet=None):\n",
    "        if wavelet is None:\n",
    "            self.h_filter = [-0.2304,0.7148,-0.6309,-0.028,0.187,0.0308,-0.0329,-0.0106]\n",
    "            self.l_filter = [-0.0106,0.0329,0.0308,-0.187,-0.028,0.6309,0.7148,0.2304]\n",
    "        else:\n",
    "            w = pywt.Wavelet(wavelet)\n",
    "            self.h_filter = w.dec_hi\n",
    "            self.l_filter = w.dec_lo\n",
    "\n",
    "        self.mWDN_H = nn.Linear(seq_len,seq_len)\n",
    "        self.mWDN_L = nn.Linear(seq_len,seq_len)\n",
    "        self.mWDN_H.weight = nn.Parameter(self.create_W(seq_len,False))\n",
    "        self.mWDN_L.weight = nn.Parameter(self.create_W(seq_len,True))\n",
    "        self.sigmoid = nn.Sigmoid()\n",
    "        self.pool = nn.AvgPool1d(2)\n",
    "\n",
    "    def forward(self, x):\n",
    "        hp_1 = self.sigmoid(self.mWDN_H(x))\n",
    "        lp_1 = self.sigmoid(self.mWDN_L(x))\n",
    "        hp_out = self.pool(hp_1)\n",
    "        lp_out = self.pool(lp_1)\n",
    "        all_out = torch.cat((hp_out, lp_out), dim=-1)\n",
    "        return lp_out, all_out\n",
    "    \n",
    "    def create_W(self, P, is_l, is_comp=False):\n",
    "        if is_l: filter_list = self.l_filter\n",
    "        else: filter_list = self.h_filter\n",
    "        list_len = len(filter_list)\n",
    "        max_epsilon = np.min(np.abs(filter_list))\n",
    "        if is_comp: weight_np = np.zeros((P, P))\n",
    "        else: weight_np = np.random.randn(P, P) * 0.1 * max_epsilon\n",
    "        for i in range(0, P):\n",
    "            filter_index = 0\n",
    "            for j in range(i, P):\n",
    "                if filter_index < len(filter_list):\n",
    "                    weight_np[i][j] = filter_list[filter_index]\n",
    "                    filter_index += 1\n",
    "        return tensor(weight_np)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# export\n",
    "\n",
    "class mWDN(Module):\n",
    "    def __init__(self, c_in, c_out, seq_len, levels=3, wavelet=None, base_arch=InceptionTimePlus, **kwargs):\n",
    "        self.levels=levels\n",
    "        self.blocks = nn.ModuleList()\n",
    "        for i in range(levels): self.blocks.append(WaveBlock(c_in, c_out, seq_len // 2 ** i, wavelet=wavelet))\n",
    "        self._model = build_model(base_arch, c_in, c_out, seq_len=seq_len, **kwargs)\n",
    "\n",
    "    def forward(self, x):\n",
    "        for i in range(self.levels):\n",
    "            x, out_ =  self.blocks[i](x)\n",
    "            if i == 0: out = out_ if i == 0 else torch.cat((out, out_), dim=-1)\n",
    "        out = self._model(out)\n",
    "        return out\n",
    "    \n",
    "\n",
    "class mWDNBlocks(Module):\n",
    "    def __init__(self, c_in, c_out, seq_len, levels=3, wavelet=None):\n",
    "        self.levels=levels\n",
    "        self.blocks = nn.ModuleList()\n",
    "        for i in range(levels): self.blocks.append(WaveBlock(c_in, c_out, seq_len // 2 ** i, wavelet=wavelet))\n",
    "\n",
    "    def forward(self, x):\n",
    "        for i in range(self.levels):\n",
    "            x, out_ =  self.blocks[i](x)\n",
    "            if i == 0: out = out_ if i == 0 else torch.cat((out, out_), dim=-1)\n",
    "        return out\n",
    "    \n",
    "\n",
    "class mWDNPlus(nn.Sequential):\n",
    "    def __init__(self, c_in, c_out, seq_len, levels=3, wavelet=None, base_model=None, base_arch=InceptionTimePlus, **kwargs):\n",
    "\n",
    "        if base_model is None:\n",
    "            base_model = build_model(base_arch, c_in, c_out, seq_len=seq_len, **kwargs)\n",
    "        blocks = mWDNBlocks(c_in, c_out, seq_len, levels=levels, wavelet=wavelet)\n",
    "        backbone = nn.Sequential(blocks, base_model.backbone)\n",
    "        super().__init__(OrderedDict([('backbone', backbone), ('head', base_model.head)]))\n",
    "        self.head_nf = base_model.head_nf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from tsai.models.TSTPlus import TSTPlus\n",
    "bs = 16\n",
    "c_in = 3\n",
    "seq_len = 12\n",
    "c_out = 2\n",
    "xb = torch.rand(bs, c_in, seq_len).to(default_device())\n",
    "test_eq(mWDN(c_in, c_out, seq_len).to(xb.device)(xb).shape, [bs, c_out])\n",
    "model = mWDNPlus(c_in, c_out, seq_len, fc_dropout=.5)\n",
    "test_eq(model.to(xb.device)(xb).shape, [bs, c_out])\n",
    "model = mWDNPlus(c_in, c_out, seq_len, base_arch=TSTPlus, fc_dropout=.5)\n",
    "test_eq(model.to(xb.device)(xb).shape, [bs, c_out])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(Sequential(\n",
       "   (0): GELU()\n",
       "   (1): Flatten(full=False)\n",
       "   (2): LinBnDrop(\n",
       "     (0): Dropout(p=0.5, inplace=False)\n",
       "     (1): Linear(in_features=1536, out_features=2, bias=True)\n",
       "   )\n",
       " ),\n",
       " 128)"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model.head, model.head_nf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#hide\n",
    "out = create_scripts()\n",
    "beep(out)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
